Skip to content

Commit ce6705c

Browse files
committed
Don't allocate threads on every dispatch in Native's thread pools
1 parent eb21974 commit ce6705c

File tree

3 files changed

+123
-16
lines changed

3 files changed

+123
-16
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
package kotlinx.coroutines
5+
6+
import kotlinx.atomicfu.*
7+
import kotlin.coroutines.*
8+
import kotlin.test.*
9+
10+
class MultithreadedDispatcherStressTest {
11+
12+
private val n = (if (isNative) 1_000 else 10_000) * stressTestMultiplier
13+
val shared = atomic(0)
14+
15+
/**
16+
* Tests that [newFixedThreadPoolContext] will not drop tasks when closed.
17+
*/
18+
@Test
19+
fun testClosingNotDroppingTasks() {
20+
repeat(7) {
21+
shared.value = 0
22+
val nThreads = it + 1
23+
val dispatcher = newFixedThreadPoolContext(nThreads, "testMultiThreadedContext")
24+
repeat(n) {
25+
dispatcher.dispatch(EmptyCoroutineContext, Runnable {
26+
shared.incrementAndGet()
27+
})
28+
}
29+
dispatcher.close()
30+
val m = shared.value
31+
assertEquals(n, m, "$nThreads threads")
32+
}
33+
}
34+
}

kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt

+54-16
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines
66

7+
import kotlinx.atomicfu.*
78
import kotlinx.coroutines.channels.*
89
import kotlinx.coroutines.internal.*
910
import kotlin.coroutines.*
@@ -73,38 +74,75 @@ private class MultiWorkerDispatcher(
7374
workersCount: Int
7475
) : CloseableCoroutineDispatcher() {
7576
private val tasksQueue = Channel<Runnable>(Channel.UNLIMITED)
77+
private val availableWorkers = Channel<Channel<Runnable>>(Channel.UNLIMITED)
7678
private val workerPool = OnDemandAllocatingPool(workersCount) {
7779
Worker.start(name = "$name-$it").apply {
7880
executeAfter { workerRunLoop() }
7981
}
8082
}
8183

84+
/**
85+
* (number of tasks - number of workers) * 2 + (1 if closed)
86+
*/
87+
private val tasksAndWorkersCounter = atomic(0L)
88+
89+
private inline fun Long.isClosed() = this and 1L == 1L
90+
private inline fun Long.hasTasks() = this >= 2
91+
private inline fun Long.hasWorkers() = this < 0
92+
8293
private fun workerRunLoop() = runBlocking {
83-
// NB: we leverage tail-call optimization in this loop, do not replace it with
84-
// .receive() without proper evaluation
85-
for (task in tasksQueue) {
86-
/**
87-
* Any unhandled exception here will pass through worker's boundary and will be properly reported.
88-
*/
89-
task.run()
94+
val privateChannel = Channel<Runnable>(1)
95+
while (true) {
96+
val state = tasksAndWorkersCounter.getAndUpdate {
97+
if (it.isClosed() && !it.hasTasks()) return@runBlocking
98+
it - 2
99+
}
100+
if (state.hasTasks()) {
101+
// we promised to process a task, and there are some
102+
tasksQueue.receive().run()
103+
} else {
104+
availableWorkers.send(privateChannel)
105+
val task = privateChannel.receiveCatching().getOrNull()?.run()
106+
}
90107
}
91108
}
92109

93-
override fun dispatch(context: CoroutineContext, block: Runnable) {
94-
fun throwClosed(block: Runnable) {
95-
throw IllegalStateException("Dispatcher $name was closed, attempted to schedule: $block")
110+
private fun obtainWorker(): Channel<Runnable> {
111+
// spin loop until a worker that promised to be here actually arrives.
112+
while (true) {
113+
val result = availableWorkers.tryReceive()
114+
return result.getOrNull() ?: continue
96115
}
116+
}
97117

98-
if (!workerPool.allocate()) throwClosed(block) // Do not even try to send to avoid race
99-
100-
tasksQueue.trySend(block).onClosed {
101-
throwClosed(block)
118+
override fun dispatch(context: CoroutineContext, block: Runnable) {
119+
val state = tasksAndWorkersCounter.getAndUpdate {
120+
if (it.isClosed())
121+
throw IllegalStateException("Dispatcher $name was closed, attempted to schedule: $block")
122+
it + 2
123+
}
124+
if (state.hasWorkers()) {
125+
// there are workers that have nothing to do, let's grab one of them
126+
obtainWorker().trySend(block)
127+
} else {
128+
workerPool.allocate()
129+
// no workers are available, we must queue the task
130+
tasksQueue.trySend(block)
102131
}
103132
}
104133

105134
override fun close() {
106-
val workers = workerPool.close()
107-
tasksQueue.close()
135+
tasksAndWorkersCounter.getAndUpdate { if (it.isClosed()) it else it or 1L }
136+
val workers = workerPool.close() // no new workers will be created
137+
loop@while (true) {
138+
// check if there are workers that await tasks in their personal channels, we need to wake them up
139+
val state = tasksAndWorkersCounter.getAndUpdate {
140+
if (it.hasWorkers()) it + 2 else it
141+
}
142+
if (!state.hasWorkers())
143+
break
144+
obtainWorker().close()
145+
}
108146
/*
109147
* Here we cannot avoid waiting on `.result`, otherwise it will lead
110148
* to a native memory leak, including a pthread handle.

kotlinx-coroutines-core/native/test/WorkerTest.kt

+35
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,39 @@ class WorkerTest : TestBase() {
6262
finished.receive()
6363
}
6464
}
65+
66+
/**
67+
* Test that [newFixedThreadPoolContext] does not allocate more dispatchers than it needs to.
68+
* Incidentally also tests that it will allocate enough workers for its needs. Otherwise, the test will hang.
69+
*/
70+
@Test
71+
fun testNotAllocatingExtraDispatchers() {
72+
suspend fun spin(set: MutableSet<Worker>) {
73+
repeat(100) {
74+
set.add(Worker.current)
75+
delay(1)
76+
}
77+
}
78+
val dispatcher = newFixedThreadPoolContext(64, "test")
79+
try {
80+
runBlocking {
81+
val encounteredWorkers = mutableSetOf<Worker>()
82+
var canStart = false
83+
val coroutine1 = launch(dispatcher) {
84+
while (!canStart) {
85+
// intentionally empty
86+
}
87+
spin(encounteredWorkers)
88+
}
89+
val coroutine2 = launch(dispatcher) {
90+
canStart = true
91+
spin(encounteredWorkers)
92+
}
93+
listOf(coroutine1, coroutine2).joinAll()
94+
assertEquals(2, encounteredWorkers.size)
95+
}
96+
} finally {
97+
dispatcher.close()
98+
}
99+
}
65100
}

0 commit comments

Comments
 (0)