Skip to content

Commit f2de98f

Browse files
committed
Fix shutdown sequence for CoroutineScheduler's views
Fixes #678
1 parent f5126d0 commit f2de98f

File tree

3 files changed

+40
-10
lines changed

3 files changed

+40
-10
lines changed

core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt

+10-3
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ internal class CoroutineScheduler(
237237
private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
238238
private inline fun blockingWorkers(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt()
239239

240+
// Guarded by synchronization
240241
private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet())
241242
private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.getAndDecrement())
242243

@@ -340,7 +341,7 @@ internal class CoroutineScheduler(
340341
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
341342
timeSource.trackTask() // this is needed for virtual time support
342343
// TODO at some point make DispatchTask extend Task and make its field settable to save an allocation
343-
val task = Task(block, schedulerTimeSource.nanoTime(), taskContext)
344+
val task = createTask(block, taskContext)
344345
// try to submit the task to the local queue and act depending on the result
345346
when (submitToLocalQueue(task, fair)) {
346347
ADDED -> return
@@ -356,6 +357,8 @@ internal class CoroutineScheduler(
356357
}
357358
}
358359

360+
internal fun createTask(block: Runnable, taskContext: TaskContext) = Task(block, schedulerTimeSource.nanoTime(), taskContext)
361+
359362
/**
360363
* Unparks or creates a new [Worker] for executing non-blocking tasks if there are idle cores
361364
*/
@@ -764,8 +767,12 @@ internal class CoroutineScheduler(
764767
private fun afterTask(task: Task) {
765768
if (task.mode != TaskMode.NON_BLOCKING) {
766769
decrementBlockingWorkers()
767-
assert(state == WorkerState.BLOCKING) { "Expected BLOCKING state, but has $state" }
768-
state = WorkerState.RETIRING
770+
val currentState = state
771+
// Shutdown sequence of blocking dispatcher
772+
if (currentState !== WorkerState.TERMINATED) {
773+
assert(currentState == WorkerState.BLOCKING) { "Expected BLOCKING state, but has $currentState" }
774+
state = WorkerState.RETIRING
775+
}
769776
}
770777
}
771778

core/kotlinx-coroutines-core/src/scheduling/Dispatcher.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ open class ExperimentalCoroutineDispatcher(
101101
try {
102102
coroutineScheduler.dispatch(block, context, fair)
103103
} catch (e: RejectedExecutionException) {
104-
DefaultExecutor.execute(block)
104+
// Context shouldn't be lost here to properly invoke before/after task
105+
DefaultExecutor.execute(coroutineScheduler.createTask(block, context))
105106
}
106107

107108
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs)

core/kotlinx-coroutines-core/test/scheduling/CoroutineSchedulerCloseStressTest.kt

+28-6
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,29 @@ package kotlinx.coroutines.experimental.scheduling
77
import kotlinx.atomicfu.*
88
import kotlinx.coroutines.experimental.*
99
import org.junit.Test
10+
import org.junit.runner.*
11+
import org.junit.runners.*
1012
import java.util.*
13+
import java.util.concurrent.*
1114
import kotlin.test.*
1215

13-
class CoroutineSchedulerCloseStressTest : TestBase() {
16+
@RunWith(Parameterized::class)
17+
class CoroutineSchedulerCloseStressTest(private val mode: Mode) : TestBase() {
18+
enum class Mode { CPU, BLOCKING, CPU_LIMITED }
19+
20+
companion object {
21+
@Parameterized.Parameters(name = "mode={0}")
22+
@JvmStatic
23+
fun params(): Collection<Array<Any>> = Mode.values().map { arrayOf<Any>(it) }
24+
}
25+
1426
private val N_REPEAT = 2 * stressTestMultiplier
1527
private val MAX_LEVEL = 5
1628
private val N_COROS = (1 shl (MAX_LEVEL + 1)) - 1
1729
private val N_THREADS = 4
1830
private val rnd = Random()
1931

32+
private lateinit var closeableDispatcher: ExperimentalCoroutineDispatcher
2033
private lateinit var dispatcher: ExecutorCoroutineDispatcher
2134
private var closeIndex = -1
2235

@@ -28,7 +41,7 @@ class CoroutineSchedulerCloseStressTest : TestBase() {
2841
try {
2942
launchCoroutines()
3043
} finally {
31-
dispatcher.close()
44+
closeableDispatcher.close()
3245
}
3346
}
3447

@@ -41,7 +54,12 @@ class CoroutineSchedulerCloseStressTest : TestBase() {
4154
}
4255

4356
private fun launchCoroutines() = runBlocking {
44-
dispatcher = ExperimentalCoroutineDispatcher(N_THREADS)
57+
closeableDispatcher = ExperimentalCoroutineDispatcher(N_THREADS)
58+
dispatcher = when (mode) {
59+
Mode.CPU -> closeableDispatcher
60+
Mode.CPU_LIMITED -> closeableDispatcher.limited(N_THREADS) as ExecutorCoroutineDispatcher
61+
Mode.BLOCKING -> closeableDispatcher.blocking(N_THREADS) as ExecutorCoroutineDispatcher
62+
}
4563
started.value = 0
4664
finished.value = 0
4765
withContext(dispatcher) {
@@ -54,15 +72,19 @@ class CoroutineSchedulerCloseStressTest : TestBase() {
5472
private fun CoroutineScope.launchChild(index: Int, level: Int): Job = launch(start = CoroutineStart.ATOMIC) {
5573
started.incrementAndGet()
5674
try {
57-
if (index == closeIndex) dispatcher.close()
75+
if (index == closeIndex) closeableDispatcher.close()
5876
if (level < MAX_LEVEL) {
5977
launchChild(2 * index + 1, level + 1)
6078
launchChild(2 * index + 2, level + 1)
6179
} else {
62-
delay(1000)
80+
if (rnd.nextBoolean()) {
81+
delay(1000)
82+
} else {
83+
yield()
84+
}
6385
}
6486
} finally {
6587
finished.incrementAndGet()
6688
}
6789
}
68-
}
90+
}

0 commit comments

Comments
 (0)