-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathSchedulerTestBase.kt
108 lines (90 loc) · 3.64 KB
/
SchedulerTestBase.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("UNUSED_VARIABLE")
package kotlinx.coroutines.scheduling
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import org.junit.*
import kotlin.coroutines.*
import kotlin.test.*
abstract class SchedulerTestBase : TestBase() {
companion object {
val CORES_COUNT = AVAILABLE_PROCESSORS
/**
* Asserts that [expectedThreadsCount] pool worker threads were created.
* Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking
*/
fun checkPoolThreadsCreated(expectedThreadsCount: Int = CORES_COUNT) {
val threadsCount = maxSequenceNumber()!!
assertEquals(expectedThreadsCount, threadsCount, "Expected $expectedThreadsCount pool threads, but has $threadsCount")
}
/**
* Asserts that any number of pool worker threads in [range] were created.
* Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking
*/
fun checkPoolThreadsCreated(range: IntRange, base: Int = CORES_COUNT) {
val maxSequenceNumber = maxSequenceNumber()!!
val r = (range.first)..(range.last + base)
assertTrue(
maxSequenceNumber in r,
"Expected pool threads to be in interval $r, but has $maxSequenceNumber"
)
}
/**
* Asserts that any number of pool worker threads in [range] exists at the time of method invocation
*/
fun checkPoolThreadsExist(range: IntRange) {
val threads = Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker }.count()
assertTrue(threads in range, "Expected threads in $range interval, but has $threads")
}
private fun maxSequenceNumber(): Int? {
return Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker }
.map { sequenceNumber(it.name) }.maxOrNull()
}
private fun sequenceNumber(threadName: String): Int {
val suffix = threadName.substring(threadName.lastIndexOf("-") + 1)
val separatorIndex = suffix.indexOf(' ')
if (separatorIndex == -1) {
return suffix.toInt()
}
return suffix.substring(0, separatorIndex).toInt()
}
suspend fun Iterable<Job>.joinAll() = forEach { it.join() }
}
protected var corePoolSize = CORES_COUNT
protected var maxPoolSize = 1024
protected var idleWorkerKeepAliveNs = IDLE_WORKER_KEEP_ALIVE_NS
private var _dispatcher: ExperimentalCoroutineDispatcher? = null
protected val dispatcher: CoroutineDispatcher
get() {
if (_dispatcher == null) {
_dispatcher = ExperimentalCoroutineDispatcher(
corePoolSize,
maxPoolSize,
idleWorkerKeepAliveNs
)
}
return _dispatcher!!
}
protected var blockingDispatcher = lazy {
blockingDispatcher(1000)
}
protected fun blockingDispatcher(parallelism: Int): CoroutineDispatcher {
val intitialize = dispatcher
return _dispatcher!!.blocking(parallelism)
}
protected fun view(parallelism: Int): CoroutineDispatcher {
val intitialize = dispatcher
return _dispatcher!!.limited(parallelism)
}
@After
fun after() {
runBlocking {
withTimeout(5_000) {
_dispatcher?.close()
}
}
}
}