Skip to content

Commit 241e74e

Browse files
committed
CoroutineScheduler: fixed race between createNewWorker & shutdown for tests
1 parent 6e3ffb1 commit 241e74e

File tree

1 file changed

+42
-40
lines changed

1 file changed

+42
-40
lines changed

core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt

Lines changed: 42 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ internal class CoroutineScheduler(
246246
private val random = Random()
247247

248248
// This is used a "stop signal" for debugging/tests only
249-
private val isTerminated = atomic(0) // workaround for atomicfu bug
249+
private val isTerminated = atomic(false)
250250

251251
companion object {
252252
private const val MAX_SPINS = 1000
@@ -294,29 +294,21 @@ internal class CoroutineScheduler(
294294

295295
/*
296296
* Shuts down current scheduler and waits until all threads are stopped.
297-
* This method uses unsafe API (unconditional unparks, ignoring interruptions etc.)
297+
* This method uses unsafe API (does unconditional unparks)
298298
* and intended to be used only for testing. Invocation has no additional effect if already closed.
299299
*/
300300
fun shutdown(timeout: Long) {
301-
if (!isTerminated.compareAndSet(0, 1)) return
302-
// Race with recently created threads which may park indefinitely
303-
var finishedThreads = 0
304-
while (finishedThreads < createdWorkers) {
305-
var finished = 0
306-
for (i in 1..createdWorkers) {
307-
synchronized(workers) {
308-
// Get the worker from array and also clear reference in array under synchronization
309-
workers[i].also { workers[i] = null }
310-
} ?.also {
311-
if (it.isAlive) {
312-
// Unparking alive thread is unsafe in general, but acceptable for testing purposes
313-
LockSupport.unpark(it)
314-
it.join(timeout)
315-
}
316-
++finished
317-
}
301+
// atomically set termination flag which is checked when workers are added or removed
302+
if (!isTerminated.compareAndSet(false, true)) return
303+
// Capture # of created workers that cannot change anymore (mind the synchronized block!)
304+
val created = synchronized(workers) { createdWorkers }
305+
for (i in 1..created) {
306+
val worker = workers[i]!!
307+
if (worker.isAlive) {
308+
// Unparking alive thread is unsafe in general, but acceptable for testing purposes
309+
LockSupport.unpark(worker)
310+
worker.join(timeout)
318311
}
319-
finishedThreads = finished
320312
}
321313
// cleanup state to make sure that tryUnpark tries to create new threads and crashes because it isTerminated
322314
assert(cpuPermits.availablePermits() == corePoolSize)
@@ -438,8 +430,8 @@ internal class CoroutineScheduler(
438430
*/
439431
private fun createNewWorker(): Int {
440432
synchronized(workers) {
441-
// for test purposes make sure we're not trying to resurrect terminated scheduler
442-
require(isTerminated.value == 0) { "This scheduler was terminated "}
433+
// Make sure we're not trying to resurrect terminated scheduler
434+
if (isTerminated.value) throw ShutdownException()
443435
val state = controlState.value
444436
val created = createdWorkers(state)
445437
val blocking = blockingWorkers(state)
@@ -456,6 +448,9 @@ internal class CoroutineScheduler(
456448
}
457449
}
458450

451+
// Is thrown when attempting to create new worker, but this scheduler isTerminated
452+
private class ShutdownException : RuntimeException()
453+
459454
/**
460455
* Returns [ADDED], or [NOT_ADDED], or [ADDED_REQUIRES_HELP].
461456
*/
@@ -682,28 +677,33 @@ internal class CoroutineScheduler(
682677
private var lastStealIndex = 0 // try in order repeated, reset when unparked
683678

684679
override fun run() {
685-
var wasIdle = false // local variable to avoid extra idleReset invocations when tasks repeatedly arrive
686-
while (isTerminated.value == 0 && state != WorkerState.TERMINATED) {
687-
val task = findTask()
688-
if (task == null) {
689-
// Wait for a job with potential park
690-
if (state == WorkerState.CPU_ACQUIRED) {
691-
cpuWorkerIdle()
680+
try {
681+
var wasIdle = false // local variable to avoid extra idleReset invocations when tasks repeatedly arrive
682+
while (!isTerminated.value && state != WorkerState.TERMINATED) {
683+
val task = findTask()
684+
if (task == null) {
685+
// Wait for a job with potential park
686+
if (state == WorkerState.CPU_ACQUIRED) {
687+
cpuWorkerIdle()
688+
} else {
689+
blockingWorkerIdle()
690+
}
691+
wasIdle = true
692692
} else {
693-
blockingWorkerIdle()
694-
}
695-
wasIdle = true
696-
} else {
697-
if (wasIdle) {
698-
idleReset(task.mode)
699-
wasIdle = false
693+
if (wasIdle) {
694+
idleReset(task.mode)
695+
wasIdle = false
696+
}
697+
beforeTask(task)
698+
runSafely(task)
699+
afterTask(task)
700700
}
701-
beforeTask(task)
702-
runSafely(task)
703-
afterTask(task)
704701
}
702+
} catch(e: ShutdownException) {
703+
// race with shutdown -- ignore exception and don't print it on the console
704+
} finally {
705+
tryReleaseCpu(WorkerState.TERMINATED)
705706
}
706-
tryReleaseCpu(WorkerState.TERMINATED)
707707
}
708708

709709
private fun runSafely(task: Task) {
@@ -814,6 +814,8 @@ internal class CoroutineScheduler(
814814
*/
815815
private fun tryTerminateWorker() {
816816
synchronized(workers) {
817+
// Make sure we're not trying race with termination of scheduler
818+
if (isTerminated.value) throw ShutdownException()
817819
// Someone else terminated, bail out
818820
if (createdWorkers <= corePoolSize) return
819821
// Try to find blocking task before termination

0 commit comments

Comments
 (0)