Skip to content

Fixed hangs during thread termination in IO dispatcher #544

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 21 additions & 23 deletions core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -111,21 +111,19 @@ internal class CoroutineScheduler(
*
* Note, [newIndex] can be zero for the worker that is being terminated (removed from [workers]).
*/
private fun parkedWorkersStackTopUpdate(oldIndex: Int, newIndex: Int) {
private fun parkedWorkersStackTopUpdate(worker: Worker, oldIndex: Int, newIndex: Int) {
parkedWorkersStack.loop { top ->
val index = (top and PARKED_INDEX_MASK).toInt()
val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
val updIndex = if (index == oldIndex) {
if (newIndex == 0) {
parkedWorkersStackNextIndex(workers[oldIndex]!!)
}
else {
parkedWorkersStackNextIndex(worker)
} else {
newIndex
}
} else {
index // no change to index, but update version
}

if (updIndex < 0) return@loop // retry
if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return
}
Expand Down Expand Up @@ -254,7 +252,7 @@ internal class CoroutineScheduler(
private const val MAX_SPINS = 1000
private const val MAX_YIELDS = MAX_SPINS + 500

@JvmStatic // Note, that is fits into Int (it is is equal to 10^9)
@JvmStatic // Note, that is fits into Int (it is equal to 10^9)
private val MAX_PARK_TIME_NS = TimeUnit.SECONDS.toNanos(1).toInt()

@JvmStatic
Expand Down Expand Up @@ -824,10 +822,7 @@ internal class CoroutineScheduler(
* See tryUnpark for state reasoning.
* If this CAS fails, then we were successfully unparked by other worker and cannot terminate.
*/
if (!terminationState.compareAndSet(
ALLOWED,
TERMINATED
)) return
if (!terminationState.compareAndSet(ALLOWED, TERMINATED)) return
/*
* At this point this thread is no longer considered as usable for scheduling.
* We need multi-step choreography to reindex workers.
Expand All @@ -843,22 +838,25 @@ internal class CoroutineScheduler(
* 2) Update top of stack if it was pointing to oldIndex and make sure no
* pending push/pop operation that might have already retrieved oldIndex could complete.
*/
parkedWorkersStackTopUpdate(oldIndex, 0)
parkedWorkersStackTopUpdate(this, oldIndex, 0)
/*
* 3) Move last worker into an index in array that was previously occupied by this worker.
* 3) Move last worker into an index in array that was previously occupied by this worker,
* if last worker was a different one (sic!).
*/
val lastIndex = decrementCreatedWorkers()
val lastWorker = workers[lastIndex]!!
workers[oldIndex] = lastWorker
lastWorker.indexInArray = oldIndex
/*
* Now lastWorker is available at both indices in the array, but it can
* still be at the stack top on via its lastIndex
*
* 4) Update top of stack lastIndex -> oldIndex and make sure no
* pending push/pop operation that might have already retrieved lastIndex could complete.
*/
parkedWorkersStackTopUpdate(lastIndex, oldIndex)
if (lastIndex != oldIndex) {
val lastWorker = workers[lastIndex]!!
workers[oldIndex] = lastWorker
lastWorker.indexInArray = oldIndex
/*
* Now lastWorker is available at both indices in the array, but it can
* still be at the stack top on via its lastIndex
*
* 4) Update top of stack lastIndex -> oldIndex and make sure no
* pending push/pop operation that might have already retrieved lastIndex could complete.
*/
parkedWorkersStackTopUpdate(lastWorker, lastIndex, oldIndex)
}
/*
* 5) It is safe to clear reference from workers array now.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.experimental

import kotlinx.coroutines.experimental.scheduling.*
import org.junit.*
import java.util.*
import java.util.concurrent.*

class BlockingIOTerminationStressTest : TestBase() {
private val baseDispatcher = ExperimentalCoroutineDispatcher(
2, 20,
TimeUnit.MILLISECONDS.toNanos(10)
)
private val ioDispatcher = baseDispatcher.blocking()
private val TEST_SECONDS = 3L * stressTestMultiplier

@After
fun tearDown() {
baseDispatcher.close()
}

@Test
fun testTermination() {
val rnd = Random()
val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(TEST_SECONDS)
while (System.currentTimeMillis() < deadline) {
Thread.sleep(rnd.nextInt(30).toLong())
repeat(rnd.nextInt(5) + 1) {
launch(ioDispatcher) {
Thread.sleep(rnd.nextInt(5).toLong())
}
}
}
}
}