Skip to content

Commit 9554c01

Browse files
committed
~
1 parent 17056f8 commit 9554c01

File tree

6 files changed

+74
-52
lines changed

6 files changed

+74
-52
lines changed

kotlinx-coroutines-core/jvm/src/EventLoop.kt

+4-2
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit
103103
*/
104104
@InternalCoroutinesApi
105105
@DelicateCoroutinesApi
106-
public fun runSingleTaskFromCurrentSystemDispatcher(): Long {
106+
@PublishedApi
107+
internal fun runSingleTaskFromCurrentSystemDispatcher(): Long {
107108
val thread = Thread.currentThread()
108109
if (thread !is CoroutineScheduler.Worker) throw IllegalStateException("Expected CoroutineScheduler.Worker, but got $thread")
109110
return thread.runSingleTask()
@@ -119,7 +120,8 @@ public fun runSingleTaskFromCurrentSystemDispatcher(): Long {
119120
*/
120121
@InternalCoroutinesApi
121122
@DelicateCoroutinesApi
122-
public fun Thread.isIoDispatcherThread(): Boolean {
123+
@PublishedApi
124+
internal fun Thread.isIoDispatcherThread(): Boolean {
123125
if (this !is CoroutineScheduler.Worker) return false
124126
return isIo()
125127
}

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

+2
Original file line numberDiff line numberDiff line change
@@ -906,12 +906,14 @@ internal class CoroutineScheduler(
906906
return findBlockingTask()
907907
}
908908

909+
// NB: ONLY for runSingleTask method
909910
private fun findBlockingTask(): Task? {
910911
return localQueue.pollBlocking()
911912
?: globalBlockingQueue.removeFirstOrNull()
912913
?: trySteal(STEAL_BLOCKING_ONLY)
913914
}
914915

916+
// NB: ONLY for runSingleTask method
915917
private fun findCpuTask(): Task? {
916918
return localQueue.pollCpu()
917919
?: globalBlockingQueue.removeFirstOrNull()

kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt

+6-4
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,12 @@ internal class WorkQueue {
112112
*
113113
* Returns [NOTHING_TO_STEAL] if queue has nothing to steal, [TASK_STOLEN] if at least task was stolen
114114
* or positive value of how many nanoseconds should pass until the head of this queue will be available to steal.
115+
*
116+
* [StealingMode] controls what tasks to steal:
117+
* * [STEAL_ANY] is default mode for scheduler, task from the head (in FIFO order) is stolen
118+
* * [STEAL_BLOCKING_ONLY] is mode for stealing *an arbitrary* blocking task which is used by scheduler when helping in Dispatchers.IO mode
119+
* * [STEAL_CPU_ONLY] is a kludge for `runSingleTaskFromCurrentSystemDispatcher`
115120
*/
116-
// TODO move it to tests where appropriate
117-
fun trySteal(stolenTaskRef: ObjectRef<Task?>): Long = trySteal(STEAL_ANY, stolenTaskRef)
118-
119121
fun trySteal(stealingMode: StealingMode, stolenTaskRef: ObjectRef<Task?>): Long {
120122
val task = when (stealingMode) {
121123
STEAL_ANY -> pollBuffer()
@@ -168,7 +170,7 @@ internal class WorkQueue {
168170
return pollWithMode(onlyBlocking = false /* only cpu */)
169171
}
170172

171-
private fun pollWithMode(/* Only blocking OR only CPU */onlyBlocking: Boolean): Task? {
173+
private fun pollWithMode(/* Only blocking OR only CPU */ onlyBlocking: Boolean): Task? {
172174
val start = consumerIndex.value
173175
var end = producerIndex.value
174176
// CPU or (BLOCKING & hasBlocking)

kotlinx-coroutines-core/jvm/test/TestBase.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public actual typealias TestResult = Unit
5656
*/
5757
public actual open class TestBase(private var disableOutCheck: Boolean) {
5858

59-
actual constructor(): this(true)
59+
actual constructor(): this(false)
6060

6161
public actual val isBoundByJsTestTimeout = false
6262
private var actionIndex = AtomicInteger()

kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt

+59-45
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import kotlinx.coroutines.internal.AVAILABLE_PROCESSORS
99
import org.junit.Test
1010
import java.util.*
1111
import java.util.concurrent.ConcurrentHashMap
12+
import java.util.concurrent.CountDownLatch
1213
import java.util.concurrent.CyclicBarrier
1314
import java.util.concurrent.atomic.AtomicInteger
1415
import kotlin.random.*
@@ -18,63 +19,76 @@ import kotlin.time.*
1819

1920
class CoroutineSchedulerInternalApiStressTest : TestBase() {
2021

21-
@Test
22-
fun testHelpDefaultIoIsIsolated() {
23-
repeat(10) {
24-
runTest {
25-
val jobToComplete = Job()
26-
val tasksToCompleteJob = AtomicInteger(200)
22+
@Test(timeout = 120_000L)
23+
fun testHelpDefaultIoIsIsolated() = repeat(100 * stressTestMultiplierSqrt) {
24+
val ioTaskMarker = ThreadLocal.withInitial { false }
25+
var threadThatShould = Thread.currentThread()
26+
runTest {
27+
val jobToComplete = Job()
28+
val expectedIterations = 100
29+
val completionLatch = CountDownLatch(1)
30+
val tasksToCompleteJob = AtomicInteger(expectedIterations)
31+
val observedIoThreads = Collections.newSetFromMap(ConcurrentHashMap<Thread, Boolean>())
32+
val observedDefaultThreads = Collections.newSetFromMap(ConcurrentHashMap<Thread, Boolean>())
2733

28-
val observedIoThreads = Collections.newSetFromMap(ConcurrentHashMap<Thread, Boolean>())
29-
val observedDefaultThreads = Collections.newSetFromMap(ConcurrentHashMap<Thread, Boolean>())
30-
31-
val barrier = CyclicBarrier(AVAILABLE_PROCESSORS)
32-
repeat(AVAILABLE_PROCESSORS - 1) {
33-
// Launch CORES - 1 spawners
34-
launch(Dispatchers.Default) {
35-
barrier.await()
36-
while (!jobToComplete.isCompleted) {
37-
launch {
38-
observedDefaultThreads.add(Thread.currentThread())
39-
val tasksLeft = tasksToCompleteJob.decrementAndGet()
40-
if (tasksLeft == 0) {
41-
// Verify threads first
42-
try {
43-
assertFalse(observedIoThreads.containsAll(observedDefaultThreads))
44-
} finally {
45-
jobToComplete.complete()
46-
}
34+
val barrier = CyclicBarrier(AVAILABLE_PROCESSORS)
35+
val spawners = ArrayList<Job>()
36+
repeat(AVAILABLE_PROCESSORS - 1) {
37+
// Launch CORES - 1 spawners
38+
spawners += launch(Dispatchers.Default) {
39+
barrier.await()
40+
repeat(expectedIterations) {
41+
launch {
42+
val tasksLeft = tasksToCompleteJob.decrementAndGet()
43+
if (tasksLeft < 0) return@launch // Leftovers are being executed all over the place
44+
if (threadThatShould !== Thread.currentThread()) {
45+
val a = 2
46+
}
47+
observedDefaultThreads.add(Thread.currentThread())
48+
if (tasksLeft == 0) {
49+
// Verify threads first
50+
try {
51+
assertFalse(observedIoThreads.containsAll(observedDefaultThreads))
52+
} finally {
53+
jobToComplete.complete()
4754
}
4855
}
56+
}
4957

50-
// Sometimes launch an IO task
51-
if (Random.nextInt(0..9) == 0) {
52-
launch(Dispatchers.IO) {
53-
observedIoThreads.add(Thread.currentThread())
54-
assertTrue(Thread.currentThread().isIoDispatcherThread())
55-
}
58+
// Sometimes launch an IO task to mess with a scheduler
59+
if (Random.nextInt(0..9) == 0) {
60+
launch(Dispatchers.IO) {
61+
ioTaskMarker.set(true)
62+
observedIoThreads.add(Thread.currentThread())
63+
assertTrue(Thread.currentThread().isIoDispatcherThread())
5664
}
5765
}
5866
}
67+
completionLatch.await()
5968
}
69+
}
6070

61-
withContext(Dispatchers.Default) {
62-
barrier.await()
63-
64-
while (!jobToComplete.isCompleted) {
65-
val result = runSingleTaskFromCurrentSystemDispatcher()
66-
if (result == 0L) {
67-
continue
68-
} else if (result >= 0L) {
69-
delay(result.toDuration(DurationUnit.NANOSECONDS))
70-
} else {
71-
delay(10)
72-
}
71+
withContext(Dispatchers.Default) {
72+
threadThatShould = Thread.currentThread()
73+
barrier.await()
74+
var timesHelped = 0
75+
while (!jobToComplete.isCompleted) {
76+
val result = runSingleTaskFromCurrentSystemDispatcher()
77+
assertFalse(ioTaskMarker.get())
78+
if (result == 0L) {
79+
++timesHelped
80+
continue
81+
} else if (result >= 0L) {
82+
Thread.sleep(result.toDuration(DurationUnit.NANOSECONDS).toDelayMillis())
83+
} else {
84+
Thread.sleep(10)
7385
}
74-
assertTrue(Thread.currentThread() in observedDefaultThreads)
7586
}
76-
coroutineContext.job.children.toList().joinAll()
87+
completionLatch.countDown()
88+
// assertEquals(100, timesHelped)
89+
// assertTrue(Thread.currentThread() in observedDefaultThreads, observedDefaultThreads.toString())
7790
}
7891
}
7992
}
8093
}
94+

kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt

+2
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,5 @@ internal fun GlobalQueue.drain(): List<Long> {
107107
}
108108
return result
109109
}
110+
111+
internal fun WorkQueue.trySteal(stolenTaskRef: ObjectRef<Task?>): Long = trySteal(STEAL_ANY, stolenTaskRef)

0 commit comments

Comments
 (0)