Skip to content

Commit 704841b

Browse files
elizarovpablobaxter
authored andcommitted
Optimization: resizable workers array (Kotlin#3137)
Instead of allocating an array of maxPoolSize (~2M) elements for the worst-case supported scenario that may never be reached in practice and takes considerable memory, allocate just an array of corePoolSize elements and grow it dynamically if needed to accommodate more workers. The data structure to make it happen must support lock-free reads for performance reasons, but it is simple since the workers array is modified exclusively under synchronization.
1 parent 15e969e commit 704841b

File tree

3 files changed

+70
-6
lines changed

3 files changed

+70
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.internal
6+
7+
import java.util.concurrent.atomic.*
8+
9+
/**
10+
* Atomic array with lock-free reads and synchronized modifications. It logically has an unbounded size,
11+
* is implicitly filled with nulls, and is resized on updates as needed to grow.
12+
*/
13+
internal class ResizableAtomicArray<T>(initialLength: Int) {
14+
@Volatile
15+
private var array = AtomicReferenceArray<T>(initialLength)
16+
17+
// for debug output
18+
public fun currentLength(): Int = array.length()
19+
20+
public operator fun get(index: Int): T? {
21+
val array = this.array // volatile read
22+
return if (index < array.length()) array[index] else null
23+
}
24+
25+
// Must not be called concurrently, e.g. always use synchronized(this) to call this function
26+
fun setSynchronized(index: Int, value: T?) {
27+
val curArray = this.array
28+
val curLen = curArray.length()
29+
if (index < curLen) {
30+
curArray[index] = value
31+
} else {
32+
val newArray = AtomicReferenceArray<T>((index + 1).coerceAtLeast(2 * curLen))
33+
for (i in 0 until curLen) newArray[i] = curArray[i]
34+
newArray[index] = value
35+
array = newArray // copy done
36+
}
37+
}
38+
}

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

+5-6
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import kotlinx.coroutines.*
99
import kotlinx.coroutines.internal.*
1010
import java.io.*
1111
import java.util.concurrent.*
12-
import java.util.concurrent.atomic.*
1312
import java.util.concurrent.locks.*
1413
import kotlin.math.*
1514
import kotlin.random.*
@@ -261,7 +260,7 @@ internal class CoroutineScheduler(
261260
* works properly
262261
*/
263262
@JvmField
264-
val workers = AtomicReferenceArray<Worker?>(maxPoolSize + 1)
263+
val workers = ResizableAtomicArray<Worker>(corePoolSize + 1)
265264

266265
/**
267266
* Long describing state of workers in this pool.
@@ -480,7 +479,7 @@ internal class CoroutineScheduler(
480479
* 3) Only then start the worker, otherwise it may miss its own creation
481480
*/
482481
val worker = Worker(newIndex)
483-
workers[newIndex] = worker
482+
workers.setSynchronized(newIndex, worker)
484483
require(newIndex == incrementCreatedWorkers())
485484
worker.start()
486485
return cpuWorkers + 1
@@ -525,7 +524,7 @@ internal class CoroutineScheduler(
525524
var dormant = 0
526525
var terminated = 0
527526
val queueSizes = arrayListOf<String>()
528-
for (index in 1 until workers.length()) {
527+
for (index in 1 until workers.currentLength()) {
529528
val worker = workers[index] ?: continue
530529
val queueSize = worker.localQueue.size
531530
when (worker.state) {
@@ -838,7 +837,7 @@ internal class CoroutineScheduler(
838837
val lastIndex = decrementCreatedWorkers()
839838
if (lastIndex != oldIndex) {
840839
val lastWorker = workers[lastIndex]!!
841-
workers[oldIndex] = lastWorker
840+
workers.setSynchronized(oldIndex, lastWorker)
842841
lastWorker.indexInArray = oldIndex
843842
/*
844843
* Now lastWorker is available at both indices in the array, but it can
@@ -852,7 +851,7 @@ internal class CoroutineScheduler(
852851
/*
853852
* 5) It is safe to clear reference from workers array now.
854853
*/
855-
workers[lastIndex] = null
854+
workers.setSynchronized(lastIndex, null)
856855
}
857856
state = WorkerState.TERMINATED
858857
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.lincheck
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.internal.*
9+
import org.jetbrains.kotlinx.lincheck.annotations.*
10+
import org.jetbrains.kotlinx.lincheck.paramgen.*
11+
12+
@Param(name = "index", gen = IntGen::class, conf = "0:4")
13+
@Param(name = "value", gen = IntGen::class, conf = "1:5")
14+
@OpGroupConfig(name = "sync", nonParallel = true)
15+
class ResizableAtomicArrayLincheckTest : AbstractLincheckTest() {
16+
private val a = ResizableAtomicArray<Int>(2)
17+
18+
@Operation
19+
fun get(@Param(name = "index") index: Int): Int? = a[index]
20+
21+
@Operation(group = "sync")
22+
fun set(@Param(name = "index") index: Int, @Param(name = "value") value: Int) {
23+
a.setSynchronized(index, value)
24+
}
25+
26+
override fun extractState() = (0..4).map { a[it] }
27+
}

0 commit comments

Comments
 (0)